# More efficient broadcast of arrays with memmap

Data movement is where IPython's naive model suffers the most.
But knowing about your cluster lets you make smarter decisions about data movement than a simple `rc[:].push`.

I ran this example with a cluster on a 64-core remote VM,
so communication between the client and controller is over the public internet,
while communication between the controller and engines is local.

This is an example of 'broadcasting' a numpy array using memmapped files,
to reduce the amount of expensive network traffic when several engines are on the same host.

In [1]:
import socket
import os, sys, re

import numpy as np

import ipyparallel as ipp

In [7]:
rc = ipp.Client(profile="mpi")
eall = rc.broadcast_view(coalescing=True)

First, create a map of engine id to hostname

In [8]:
engine_hosts = eall.apply_async(socket.gethostname).get_dict()
engine_hosts

{0: 'ip-172-31-2-77',
 1: 'ip-172-31-2-77',
 2: 'ip-172-31-2-77',
 3: 'ip-172-31-2-77',
 4: 'ip-172-31-2-77',
 5: 'ip-172-31-2-77',
 6: 'ip-172-31-2-77',
 7: 'ip-172-31-2-77',
 8: 'ip-172-31-2-77',
 9: 'ip-172-31-2-77',
 10: 'ip-172-31-2-77',
 11: 'ip-172-31-2-77',
 12: 'ip-172-31-2-77',
 13: 'ip-172-31-2-77',
 14: 'ip-172-31-2-77',
 15: 'ip-172-31-2-77',
 16: 'ip-172-31-2-77',
 17: 'ip-172-31-2-77',
 18: 'ip-172-31-2-77',
 19: 'ip-172-31-2-77',
 20: 'ip-172-31-2-77',
 21: 'ip-172-31-2-77',
 22: 'ip-172-31-2-77',
 23: 'ip-172-31-2-77',
 24: 'ip-172-31-2-77',
 25: 'ip-172-31-2-77',
 26: 'ip-172-31-2-77',
 27: 'ip-172-31-2-77',
 28: 'ip-172-31-2-77',
 29: 'ip-172-31-2-77',
 30: 'ip-172-31-2-77',
 31: 'ip-172-31-2-77',
 32: 'ip-172-31-2-77',
 33: 'ip-172-31-2-77',
 34: 'ip-172-31-2-77',
 35: 'ip-172-31-2-77',
 36: 'ip-172-31-2-77',
 37: 'ip-172-31-2-77',
 38: 'ip-172-31-2-77',
 39: 'ip-172-31-2-77',
 40: 'ip-172-31-2-77',
 41: 'ip-172-31-2-77',
 42: 'ip-172-31-2-77',
 43: 'ip-172-31-2-77'

Next, reverse that to create a map of hostname to engine ids

In [9]:
host_engines = {}

for eid, host in engine_hosts.items():
    if host not in host_engines:
        host_engines[host] = []
    host_engines[host].append(eid)

host_engines

{'ip-172-31-2-77': [0,
  1,
  2,
  3,
  4,
  5,
  6,
  7,
  8,
  9,
  10,
  11,
  12,
  13,
  14,
  15,
  16,
  17,
  18,
  19,
  20,
  21,
  22,
  23,
  24,
  25,
  26,
  27,
  28,
  29,
  30,
  31,
  32,
  33,
  34,
  35,
  36,
  37,
  38,
  39,
  40,
  41,
  42,
  43,
  44,
  45,
  46,
  47,
  48,
  49,
  50,
  51,
  52,
  53,
  54,
  55,
  56,
  57,
  58,
  59,
  60,
  61,
  62,
  63]}

Now we can measure our baseline overhead: how long does it take to roundrip an empty task on all engines.
We shouldn't expect anything to take less time than this.

In [10]:
%time _ = eall.apply_sync(lambda : None)

CPU times: user 165 ms, sys: 40.9 ms, total: 206 ms
Wall time: 1.06 s


Now let's look at how long it takes to send data in the simplest possible way

In [19]:
import numpy as np
data = np.random.random((512, 512))

In [20]:
%%time
ar = rc[:].push({'data': data}, block=False)
ar.wait_interactive()

_push:   0%|          | 0/64 [00:00<?, ?tasks/s]

CPU times: user 601 ms, sys: 235 ms, total: 836 ms
Wall time: 14.6 s


Here we get to the broadcast implementation. Instead of seinding the array directly to every engine via IPP,
we:

1. lookup each engine's host
2. pick one engine on each host
3. send the data to one engine per host
4. on all engines, load the memmapped array from disk

This results in the same data to all engines, but only one send per remote *host* instead of per remote *engine*.

In [21]:
%px import numpy as np

In [22]:
@ipp.interactive
def array_to_file(A, name):
    """write an array to a temporary file, return its filename"""
    import tempfile
    with tempfile.NamedTemporaryFile(suffix='.np', delete=False) as tf:
        np.save(tf, A)
        data_path = tf.name
    if name:
        globals()[name] = data_path
    return data_path

In [23]:
@ipp.interactive
def load_memmap(name, path, mode='r+'):
    """load a file on disk into the interactive namespace as a memmapped array"""
    globals()[name] = np.memmap(path, mode=mode)

In [24]:
def bcast_memmap(data, name, client, host_engines):
    """broadcast a numpy array efficiently
    
    - sends data to each remote host only once
    - loads with memmap everywhere
    """

    # actually push the data, just once to each machine
    memmap_path_name = f"_bcast_array_{name}"
    
    one_per_host = rc.broadcast_view([engines[0] for engines in host_engines.values()], coalescing=True)
    send_ar = one_per_host.apply_async(array_to_file, data, name=memmap_path_name)
    
    # load the data on all engines into a memmapped array
    async_results = []
    e_all = rc.broadcast_view(coalescing=True)
    return e_all.apply_async(load_memmap, name, ipp.Reference(memmap_path_name))


In [25]:
%%time
ar = bcast_memmap(data, 'data', rc, host_engines)
ar.wait_interactive()

load_memmap:   0%|          | 0/64 [00:00<?, ?tasks/s]

CPU times: user 237 ms, sys: 50.8 ms, total: 288 ms
Wall time: 1.65 s


So that's a lot quicker! And a lot less memory used in both the client and the scheduler.

In [26]:
%px np.linalg.norm(data, 2)

[0;31mOut[0:6]: [0m217636.91910151643

[0;31mOut[1:6]: [0m255.8632962531706

[0;31mOut[2:6]: [0m255.8632962531706

[0;31mOut[3:6]: [0m255.8632962531706

[0;31mOut[4:6]: [0m255.8632962531706

[0;31mOut[5:6]: [0m255.8632962531706

[0;31mOut[6:6]: [0m255.8632962531706

[0;31mOut[7:6]: [0m255.8632962531706

[0;31mOut[8:6]: [0m255.8632962531706

[0;31mOut[9:6]: [0m255.8632962531706

[0;31mOut[10:6]: [0m255.8632962531706

[0;31mOut[11:6]: [0m255.8632962531706

[0;31mOut[12:6]: [0m255.8632962531706

[0;31mOut[13:6]: [0m255.8632962531706

[0;31mOut[14:6]: [0m255.8632962531706

[0;31mOut[15:6]: [0m255.8632962531706

[0;31mOut[16:6]: [0m255.8632962531706

[0;31mOut[17:6]: [0m255.8632962531706

[0;31mOut[18:6]: [0m255.8632962531706

[0;31mOut[19:6]: [0m255.8632962531706

[0;31mOut[20:6]: [0m255.8632962531706

[0;31mOut[21:6]: [0m255.8632962531706

[0;31mOut[22:6]: [0m255.8632962531706

[0;31mOut[23:6]: [0m255.8632962531706

[0;31mOut[24:6]: [0m255.8632962531706

[0;31mOut[25:6]: [0m255.8632962531706

[0;31mOut[26:6]: [0m255.8632962531706

[0;31mOut[27:6]: [0m255.8632962531706

[0;31mOut[28:6]: [0m255.8632962531706

[0;31mOut[29:6]: [0m255.8632962531706

[0;31mOut[30:6]: [0m255.8632962531706

[0;31mOut[31:6]: [0m255.8632962531706

[0;31mOut[32:6]: [0m255.8632962531706

[0;31mOut[33:6]: [0m255.8632962531706

[0;31mOut[34:6]: [0m255.8632962531706

[0;31mOut[35:6]: [0m255.8632962531706

[0;31mOut[36:6]: [0m255.8632962531706

[0;31mOut[37:6]: [0m255.8632962531706

[0;31mOut[38:6]: [0m255.8632962531706

[0;31mOut[39:6]: [0m255.8632962531706

[0;31mOut[40:6]: [0m255.8632962531706

[0;31mOut[41:6]: [0m255.8632962531706

[0;31mOut[42:6]: [0m255.8632962531706

[0;31mOut[43:6]: [0m255.8632962531706

[0;31mOut[44:6]: [0m255.8632962531706

[0;31mOut[45:6]: [0m255.8632962531706

[0;31mOut[46:6]: [0m255.8632962531706

[0;31mOut[47:6]: [0m255.8632962531706

[0;31mOut[48:6]: [0m255.8632962531706

[0;31mOut[49:6]: [0m255.8632962531706

[0;31mOut[50:6]: [0m255.8632962531706

[0;31mOut[51:6]: [0m255.8632962531706

[0;31mOut[52:6]: [0m255.8632962531706

[0;31mOut[53:6]: [0m255.8632962531706

[0;31mOut[54:6]: [0m255.8632962531706

[0;31mOut[55:6]: [0m255.8632962531706

[0;31mOut[56:6]: [0m255.8632962531706

[0;31mOut[57:6]: [0m255.8632962531706

[0;31mOut[58:6]: [0m255.8632962531706

[0;31mOut[59:6]: [0m255.8632962531706

[0;31mOut[60:6]: [0m255.8632962531706

[0;31mOut[61:6]: [0m255.8632962531706

[0;31mOut[62:6]: [0m255.8632962531706

[0;31mOut[63:6]: [0m255.8632962531706

You can also do the same thing [with MPI](MPI%20Broadcast.ipynb).